use crate::{
    http::router::Http,
    policy,
    test_util::{support::connect::Connect, *},
    Config, Inbound,
};
use hyper::{Body, Request, Response};
use libfuzzer_sys::arbitrary::Arbitrary;
use linkerd_app_core::{
    identity, io,
    proxy::http,
    svc::{self, NewService},
    tls,
    transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr},
    NameAddr, ProxyRuntime,
};
pub use linkerd_app_test as support;
use linkerd_app_test::*;
use std::{fmt, str, sync::Arc};

#[derive(Arbitrary)]
pub struct HttpRequestSpec {
    pub uri: Vec<u8>,
    pub header_name: Vec<u8>,
    pub header_value: Vec<u8>,
    pub http_method: bool,
}

pub async fn fuzz_entry_raw(requests: Vec<HttpRequestSpec>) {
    let server = hyper::server::conn::http1::Builder::new();
    let mut client = hyper::client::conn::http1::Builder::new();
    let connect = support::connect().endpoint_fn_boxed(Target::addr(), hello_fuzz_server(server));
    let profiles = profile::resolver();
    let profile_tx =
        profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
    profile_tx.send(profile::Profile::default()).unwrap();

    // Build the outbound server
    let cfg = default_config();
    let (rt, _shutdown) = runtime();
    let server = build_fuzz_server(cfg, rt, profiles, connect).new_service(Target::HTTP1);
    let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await;

    // Now send all of the requests
    for inp in requests.iter() {
        if let Ok(uri) = std::str::from_utf8(&inp.uri[..]) {
            if let Ok(header_name) = std::str::from_utf8(&inp.header_name[..]) {
                if let Ok(header_value) = std::str::from_utf8(&inp.header_value[..]) {
                    let http_method = if inp.http_method {
                        hyper::http::Method::GET
                    } else {
                        hyper::http::Method::POST
                    };

                    if let Ok(req) = Request::builder()
                        .method(http_method)
                        .uri(uri)
                        .header(header_name, header_value)
                        .body(Body::default())
                    {
                        let rsp = client.send_request(req).await;
                        tracing::info!(?rsp);
                        if let Ok(rsp) = rsp {
                            let body = http_util::body_to_string(rsp.into_body()).await;
                            tracing::info!(?body);
                        }
                    }
                }
            }
        }
    }

    // It's okay if the background task returns an error, as this would
    // indicate that the proxy closed the connection --- which it will do on
    // invalid inputs. We want to ensure that the proxy doesn't crash in the
    // face of these inputs, and the background task will panic in this
    // case.
    drop(client);
    let res = bg.join_all().await;
    tracing::info!(?res, "background tasks completed")
}

fn hello_fuzz_server(
    http: hyper::server::conn::http1::Builder,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
    move |_endpoint| {
        let (client_io, server_io) = support::io::duplex(4096);
        let hello_svc = hyper::service::service_fn(|_request: Request<Body>| async move {
            Ok::<_, io::Error>(Response::new(Body::from("Hello world!")))
        });
        tokio::spawn(
            http.serve_connection(server_io, hello_svc)
                .in_current_span(),
        );
        Ok(io::BoxedIo::new(client_io))
    }
}

fn build_fuzz_server<I>(
    cfg: Config,
    rt: ProxyRuntime,
    profiles: resolver::Profiles,
    connect: Connect<Remote<ServerAddr>>,
) -> svc::ArcNewTcp<Target, I>
where
    I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
{
    let connect = svc::stack(connect)
        .push_map_target(|t: Http| svc::Param::<Remote<ServerAddr>>::param(&t))
        .into_inner();
    Inbound::new(cfg, rt)
        .with_stack(connect)
        .push_http_router(profiles)
        .push_http_server()
        .push_http_tcp_server()
        .into_inner()
}

impl fmt::Debug for HttpRequestSpec {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        // Custom `Debug` impl that formats the URI, header name, and header
        // value as strings if they are UTF-8, or falls back to raw bytes
        // otherwise.
        let mut dbg = f.debug_struct("HttpRequestSpec");
        dbg.field("http_method", &self.http_method);

        if let Ok(uri) = str::from_utf8(&self.uri[..]) {
            dbg.field("uri", &uri);
        } else {
            dbg.field("uri", &self.uri);
        }

        if let Ok(name) = str::from_utf8(&self.header_name[..]) {
            dbg.field("header_name", &name);
        } else {
            dbg.field("header_name", &self.header_name);
        }

        if let Ok(value) = str::from_utf8(&self.header_value[..]) {
            dbg.field("header_value", &value);
        } else {
            dbg.field("header_value", &self.header_value);
        }

        dbg.finish()
    }
}

#[derive(Clone, Debug)]
struct Target(http::Variant);

// === impl Target ===

impl Target {
    const HTTP1: Self = Self(http::Variant::Http1);

    fn addr() -> SocketAddr {
        ([127, 0, 0, 1], 80).into()
    }
}

impl svc::Param<OrigDstAddr> for Target {
    fn param(&self) -> OrigDstAddr {
        OrigDstAddr(Self::addr())
    }
}

impl svc::Param<Remote<ServerAddr>> for Target {
    fn param(&self) -> Remote<ServerAddr> {
        Remote(ServerAddr(Self::addr()))
    }
}

impl svc::Param<Remote<ClientAddr>> for Target {
    fn param(&self) -> Remote<ClientAddr> {
        Remote(ClientAddr(([192, 0, 2, 3], 50000).into()))
    }
}

impl svc::Param<http::Variant> for Target {
    fn param(&self) -> http::Variant {
        self.0
    }
}

impl svc::Param<tls::ConditionalServerTls> for Target {
    fn param(&self) -> tls::ConditionalServerTls {
        tls::ConditionalServerTls::None(tls::NoServerTls::NoClientHello)
    }
}

impl svc::Param<policy::AllowPolicy> for Target {
    fn param(&self) -> policy::AllowPolicy {
        let (policy, _) = policy::AllowPolicy::for_test(
            self.param(),
            policy::ServerPolicy {
                protocol: policy::Protocol::Http1(Arc::new([
                    linkerd_proxy_server_policy::http::default(Arc::new([policy::Authorization {
                        authentication: policy::Authentication::Unauthenticated,
                        networks: vec![std::net::IpAddr::from([192, 0, 2, 3]).into()],
                        meta: Arc::new(policy::Meta::Resource {
                            group: "policy.linkerd.io".into(),
                            kind: "server".into(),
                            name: "testsaz".into(),
                        }),
                    }])),
                ])),
                meta: Arc::new(policy::Meta::Resource {
                    group: "policy.linkerd.io".into(),
                    kind: "server".into(),
                    name: "testsrv".into(),
                }),
                local_rate_limit: Arc::new(linkerd_proxy_server_policy::LocalRateLimit::default()),
            },
        );
        policy
    }
}

impl svc::Param<policy::ServerLabel> for Target {
    fn param(&self) -> policy::ServerLabel {
        policy::ServerLabel(
            Arc::new(policy::Meta::Resource {
                group: "policy.linkerd.io".into(),
                kind: "server".into(),
                name: "testsrv".into(),
            }),
            1000,
        )
    }
}

impl svc::Param<http::normalize_uri::DefaultAuthority> for Target {
    fn param(&self) -> http::normalize_uri::DefaultAuthority {
        http::normalize_uri::DefaultAuthority(None)
    }
}

impl svc::Param<Option<identity::Id>> for Target {
    fn param(&self) -> Option<identity::Id> {
        None
    }
}
